#   Licensed to the Apache Software Foundation (ASF) under one
#   or more contributor license agreements.  See the NOTICE file
#   distributed with this work for additional information
#   regarding copyright ownership.  The ASF licenses this file
#   to you under the Apache License, Version 2.0 (the
#   "License"); you may not use this file except in compliance
#   with the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import datetime
import time

import apache_beam as beam
from apache_beam.transforms import window

from log_elements import LogElements


class Event:
    def __init__(self, id, event, date):
        self.id = id
        self.event = event
        self.date = date

    def __str__(self) -> str:
        return f'Event({self.id}, {self.event}, {self.date})'


class AddTimestampDoFn(beam.DoFn):

    def process(self, element, **kwargs):
        unix_timestamp = time.mktime(element.date.timetuple())
        yield window.TimestampedValue(element, unix_timestamp)


p = beam.Pipeline()

(p | beam.Create([
        Event('1', 'book-order', datetime.date(2020, 3, 4)),
        Event('2', 'pencil-order', datetime.date(2020, 3, 5)),
        Event('3', 'paper-order', datetime.date(2020, 3, 6)),
        Event('4', 'pencil-order', datetime.date(2020, 3, 7)),
        Event('5', 'book-order', datetime.date(2020, 3, 8)),
     ])
   | beam.ParDo(AddTimestampDoFn())
   | LogElements(with_timestamp=True))

p.run()
